package org.codehaus.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.Collection;
import java.util.Iterator;
import javax.jms.JMSException;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.AndFilter;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.filter.NoLocalFilter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;
import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageContainerManager;

public class TransientTopicBoundedMessageManager
  implements MessageContainerManager
{
  private MemoryBoundedQueueManager queueManager;
  private ConcurrentHashMap containers;
  private FilterFactory filterFactory;
  private SynchronizedBoolean started;

  public TransientTopicBoundedMessageManager(MemoryBoundedQueueManager mgr)
  {
    this.queueManager = mgr;
    this.containers = new ConcurrentHashMap();
    this.filterFactory = new FilterFactoryImpl();
    this.started = new SynchronizedBoolean(false);
  }

  public void start()
    throws JMSException
  {
    Iterator i;
    if (this.started.commit(false, true))
      for (i = this.containers.values().iterator(); i.hasNext(); ) {
        TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer)i.next();
        container.start();
      }
  }

  public void stop()
    throws JMSException
  {
    Iterator i;
    if (this.started.commit(true, false))
      for (i = this.containers.values().iterator(); i.hasNext(); ) {
        TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer)i.next();
        container.stop();
      }
  }

  public void addMessageConsumer(BrokerClient client, ConsumerInfo info)
    throws JMSException
  {
    if (info.getDestination().isTopic()) {
      TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer)this.containers.get(client);

      if (container == null) {
        MemoryBoundedQueue queue = this.queueManager.getMemoryBoundedQueue(client.toString());
        container = new TransientTopicBoundedMessageContainer(client, queue);
        this.containers.put(client, container);
        if (this.started.get()) {
          container.start();
        }
      }
      container.addConsumer(createFilter(info), info);
    }
  }

  public void removeMessageConsumer(BrokerClient client, ConsumerInfo info)
    throws JMSException
  {
    if (info.getDestination().isTopic()) {
      TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer)this.containers.get(client);

      if (container != null) {
        container.removeConsumer(info);
        if (container.isInactive()) {
          this.containers.remove(client);
          container.close();
        }
      }
    }
  }

  public void deleteSubscription(String clientId, String subscriberName)
    throws JMSException
  {
  }

  public void sendMessage(BrokerClient client, ActiveMQMessage message)
    throws JMSException
  {
    Iterator i;
    if ((message != null) && (message.getJMSActiveMQDestination().isTopic()))
      for (i = this.containers.values().iterator(); i.hasNext(); ) {
        TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer)i.next();
        container.targetAndDispatch(message.shallowCopy());
      }
  }

  public void acknowledgeMessage(BrokerClient client, MessageAck ack)
    throws JMSException
  {
  }

  public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
    throws JMSException
  {
  }

  public void redeliverMessage(BrokerClient client, MessageAck ack)
    throws JMSException
  {
  }

  public void poll()
    throws JMSException
  {
  }

  public void commitTransaction(BrokerClient client, String transactionId)
    throws JMSException
  {
  }

  public void rollbackTransaction(BrokerClient client, String transactionId)
  {
  }

  public MessageContainer getContainer(String physicalName)
  {
    return null;
  }

  protected Filter createFilter(ConsumerInfo info)
    throws JMSException
  {
    Filter filter = this.filterFactory.createFilter(info.getDestination(), info.getSelector());
    if (info.isNoLocal()) {
      filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
    }
    return filter;
  }
}